package org.hope.lee.filter;

import java.io.UnsupportedEncodingException;
import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.MixAll;
import com.alibaba.rocketmq.common.message.MessageExt;

public class FilterCustomer {
	public static void main(String[] args) throws MQClientException {
		String group_name = "filter_consumer";
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
        consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
        // 使用Java代码，在服务器做消息过滤 
        String filterCode = MixAll.file2String("E:\\code-on-oschina\\hzjsd1108sohu\\RocketMQ-learn\\rocketmq-api\\src\\main\\java\\org\\hope\\lee\\filter\\MessageFilterImpl.java");
        System.out.println(filterCode);
        consumer.subscribe("TopicFilter7", "org.hope.lee.filter.MessageFilterImpl", filterCode);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                //System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                try {
                    System.out.println(new String(msgs.get(0).getBody(),"utf-8"));
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
	}
}
